From 5834a76703c036e7d07bcb8c1da734b709862974 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Sat, 28 Jun 2014 21:30:44 -0700 Subject: [PATCH] Implement parallel compilation This commit implements the -j flag in the `cargo_rustc` module, using the primitives from the previous commits. The default parllelism is `os::num_cpus()` This change also brings proper freshness propagation instead of the hokey logic of once something is compiled, "compile everything to the right as well". --- src/cargo/lib.rs | 2 +- src/cargo/ops/cargo_rustc.rs | 155 ++++++++++++++++++++--------- src/cargo/util/dependency_queue.rs | 5 + src/cargo/util/errors.rs | 2 +- 4 files changed, 116 insertions(+), 48 deletions(-) diff --git a/src/cargo/lib.rs b/src/cargo/lib.rs index 32a45e48b..314f9d0ad 100644 --- a/src/cargo/lib.rs +++ b/src/cargo/lib.rs @@ -1,7 +1,7 @@ #![crate_id="cargo"] #![crate_type="rlib"] -#![feature(macro_rules,phase)] +#![feature(macro_rules, phase)] extern crate debug; extern crate term; diff --git a/src/cargo/ops/cargo_rustc.rs b/src/cargo/ops/cargo_rustc.rs index 9b4489d79..00798671b 100644 --- a/src/cargo/ops/cargo_rustc.rs +++ b/src/cargo/ops/cargo_rustc.rs @@ -1,14 +1,15 @@ -use std::os::args; -use std::io; +use std::hash::Hasher; +use std::hash::sip::SipHasher; use std::io::{File, IoError}; +use std::io; +use std::os::args; use std::str; -use std::hash::sip::SipHasher; -use std::hash::Hasher; +use term::color::YELLOW; use core::{Package, PackageSet, Target}; use util; use util::{CargoResult, ChainError, ProcessBuilder, internal, human, CargoError}; -use util::{Config}; +use util::{Config, TaskPool, DependencyQueue, Fresh, Dirty, Freshness}; type Args = Vec; @@ -17,10 +18,11 @@ struct Context<'a, 'b> { deps_dir: &'a Path, primary: bool, rustc_version: &'a str, - compiled_anything: bool, config: &'b mut Config<'b> } +type Job = proc():Send -> CargoResult<()>; + pub fn compile_targets<'a>(targets: &[&Target], pkg: &Package, deps: &PackageSet, config: &'a mut Config<'a>) -> CargoResult<()> { @@ -53,33 +55,38 @@ pub fn compile_targets<'a>(targets: &[&Target], pkg: &Package, deps: &PackageSet deps_dir: &deps_target_dir, primary: false, rustc_version: rustc_version.as_slice(), - compiled_anything: false, config: config }; - // Traverse the dependencies in topological order - for dep in try!(topsort(deps)).iter() { + // Build up a list of pending jobs, each of which represent compiling a + // particular package. No actual work is executed as part of this, that's + // all done later as part of the `execute` function which will run + // everything in order with proper parallelism. + let mut jobs = Vec::new(); + for dep in deps.iter() { + // Only compile lib targets for dependencies let targets = dep.get_targets().iter().filter(|target| { - // Only compile lib targets for dependencies target.is_lib() && target.get_profile().is_compile() }).collect::>(); - try!(compile(targets.as_slice(), dep, &mut cx)); + jobs.push((dep, + try!(compile(targets.as_slice(), dep, &mut cx)))); } cx.primary = true; cx.dest = &target_dir; + jobs.push((pkg, try!(compile(targets, pkg, &mut cx)))); - try!(compile(targets, pkg, &mut cx)); - - Ok(()) + // Now that we've figured out everything that we're going to do, do it! + execute(cx.config, jobs) } -fn compile(targets: &[&Target], pkg: &Package, cx: &mut Context) -> CargoResult<()> { +fn compile(targets: &[&Target], pkg: &Package, + cx: &mut Context) -> CargoResult<(Freshness, Job)> { debug!("compile_pkg; pkg={}; targets={}", pkg, pkg.get_targets()); if targets.is_empty() { - return Ok(()); + return Ok((Fresh, proc() Ok(()))) } // First check to see if this package is fresh. @@ -96,35 +103,39 @@ fn compile(targets: &[&Target], pkg: &Package, cx: &mut Context) -> CargoResult< // TODO: Figure out how this works with targets let fingerprint_loc = cx.dest.join(format!(".{}.fingerprint", pkg.get_name())); - let (is_fresh, fingerprint) = try!(is_fresh(pkg, &fingerprint_loc, cx, targets)); - if !cx.compiled_anything && is_fresh { - try!(cx.config.shell().status("Fresh", pkg)); - return Ok(()) - } + let (is_fresh, fingerprint) = try!(is_fresh(pkg, &fingerprint_loc, cx, + targets)); - // Alright, so this package is not fresh and we need to compile it. Start - // off by printing a nice helpful message and then run the custom build - // command if one is present. - try!(cx.config.shell().status("Compiling", pkg)); + let mut cmds = Vec::new(); // TODO: Should this be on the target or the package? match pkg.get_manifest().get_build() { - Some(cmd) => try!(compile_custom(pkg, cmd, cx)), + Some(cmd) => cmds.push(compile_custom(pkg, cmd, cx)), None => {} } // After the custom command has run, execute rustc for all targets of our // package. for &target in targets.iter() { - try!(rustc(&pkg.get_root(), target, cx)); + cmds.push(rustc(&pkg.get_root(), target, cx)); } - // Now that everything has successfully compiled, write our new fingerprint - // to the relevant location to prevent recompilations in the future. - try!(File::create(&fingerprint_loc).write_str(fingerprint.as_slice())); - cx.compiled_anything = true; + cmds.push(proc() { + // If this job runs, then everything has successfully compiled, so write + // our new fingerprint to the relevant location to prevent + // recompilations in the future. + try!(File::create(&fingerprint_loc).write_str(fingerprint.as_slice())); + Ok(()) + }); - Ok(()) + // TODO: this job itself may internally be parallel, but we're hiding that + // currently. How to expose the parallelism among a single target? + Ok((if is_fresh {Fresh} else {Dirty}, proc() { + for cmd in cmds.move_iter() { + try!(cmd()); + } + Ok(()) + })) } fn is_fresh(dep: &Package, loc: &Path, @@ -163,7 +174,8 @@ fn mk_target(target: &Path) -> Result<(), IoError> { io::fs::mkdir_recursive(target, io::UserRWX) } -fn compile_custom(pkg: &Package, cmd: &str, cx: &Context) -> CargoResult<()> { +fn compile_custom(pkg: &Package, cmd: &str, + cx: &Context) -> Job { // FIXME: this needs to be smarter about splitting let mut cmd = cmd.split(' '); let mut p = util::process(cmd.next().unwrap()) @@ -174,10 +186,11 @@ fn compile_custom(pkg: &Package, cmd: &str, cx: &Context) -> CargoResult<()> { for arg in cmd { p = p.arg(arg); } - p.exec_with_output().map(|_| ()).map_err(|e| e.mark_human()) + proc() p.exec_with_output().map(|_| ()).map_err(|e| e.mark_human()) } -fn rustc(root: &Path, target: &Target, cx: &Context) -> CargoResult<()> { +fn rustc(root: &Path, target: &Target, + cx: &Context) -> Job { let crate_types = target.rustc_crate_types(); @@ -185,15 +198,18 @@ fn rustc(root: &Path, target: &Target, cx: &Context) -> CargoResult<()> { root.display(), target, crate_types, cx.dest.display(), cx.deps_dir.display(), cx.primary); + let primary = cx.primary; let rustc = prepare_rustc(root, target, crate_types, cx); - try!(if cx.primary { - rustc.exec().map_err(|err| human(err.to_str())) - } else { - rustc.exec_with_output().and(Ok(())).map_err(|err| human(err.to_str())) - }); - - Ok(()) + proc() { + if primary { + rustc.exec().map_err(|err| human(err.to_str())) + } else { + rustc.exec_with_output().and(Ok(())).map_err(|err| { + human(err.to_str()) + }) + } + } } fn prepare_rustc(root: &Path, target: &Target, crate_types: Vec<&str>, @@ -237,9 +253,56 @@ fn build_deps_args(dst: &mut Args, cx: &Context) { dst.push(cx.deps_dir.display().to_str()); } -fn topsort(deps: &PackageSet) -> CargoResult { - match deps.sort() { - Some(deps) => Ok(deps), - None => return Err(internal("circular dependency detected")) +/// Execute all jobs necessary to build the dependency graph. +/// +/// This function will spawn off `config.jobs()` workers to build all of the +/// necessary dependencies, in order. Freshness is propagated as far as possible +/// along each dependency chain. +fn execute(config: &mut Config, + jobs: Vec<(&Package, (Freshness, Job))>) -> CargoResult<()> { + let pool = TaskPool::new(config.jobs()); + let (tx, rx) = channel(); + let mut queue = DependencyQueue::new(); + for (pkg, (fresh, job)) in jobs.move_iter() { + queue.enqueue(pkg, fresh, (pkg, job)); + } + + // Iteratively execute the dependency graph. Each turn of this loop will + // schedule as much work as possible and then wait for one job to finish, + // possibly scheduling more work afterwards. + let mut active = 0i; + while queue.len() > 0 { + loop { + match queue.dequeue() { + Some((name, Fresh, (pkg, _))) => { + try!(config.shell().status("Fresh", pkg)); + tx.send((name, Fresh, Ok(()))); + } + Some((name, Dirty, (pkg, job))) => { + try!(config.shell().status("Compiling", pkg)); + let my_tx = tx.clone(); + pool.execute(proc() my_tx.send((name, Dirty, job()))); + } + None => break, + } + } + + // Now that all possible work has been scheduled, wait for a piece of + // work to finish. If any package fails to build then we stop scheduling + // work as quickly as possibly. + active -= 1; + match rx.recv() { + (name, fresh, Ok(())) => queue.finish(&name, fresh), + (_, _, Err(e)) => { + if active > 0 && config.jobs() > 1 { + try!(config.shell().say("Build failed, waiting for other \ + jobs to finish...", YELLOW)); + for _ in rx.iter() {} + } + return Err(e) + } + } } + + Ok(()) } diff --git a/src/cargo/util/dependency_queue.rs b/src/cargo/util/dependency_queue.rs index b6761fc1b..689195b52 100644 --- a/src/cargo/util/dependency_queue.rs +++ b/src/cargo/util/dependency_queue.rs @@ -62,12 +62,17 @@ impl DependencyQueue { /// It is assumed that any dependencies of this package will eventually also /// be added to the dependency queue. pub fn enqueue(&mut self, pkg: &Package, fresh: Freshness, data: T) { + // ignore self-deps + if self.pkgs.contains_key(&pkg.get_name().to_str()) { return } + if fresh == Dirty { self.dirty.insert(pkg.get_name().to_str()); } let mut my_dependencies = HashSet::new(); for dep in pkg.get_dependencies().iter() { + if dep.get_name() == pkg.get_name() { continue } + let name = dep.get_name().to_str(); assert!(my_dependencies.insert(name.clone())); let rev = self.reverse_dep_map.find_or_insert(name, HashSet::new()); diff --git a/src/cargo/util/errors.rs b/src/cargo/util/errors.rs index e67f9c0cf..3d9953b04 100644 --- a/src/cargo/util/errors.rs +++ b/src/cargo/util/errors.rs @@ -62,7 +62,7 @@ macro_rules! from_error ( } ) -impl Show for Box { +impl Show for Box { fn fmt(&self, f: &mut Formatter) -> fmt::Result { try!(write!(f, "{}", self.description())); Ok(()) -- 2.30.2